繼續進行昨天未完成的課題
我們想要進行以下分析:
- 消費次數最多的贈予一支bear doll(已完成)
- 給予一次購買兩隻以上的Barbie交易結帳金額95%折的優待
- 購買五本字典的客戶贈予一支牙刷
- 贈予總消費金額最高的客戶一件睡衣
PS. 商品ID皆為已知,且贈予的物品也必須成為一筆金額為0的transaction
Barbie ID:25
思路:
找出交易紀錄中,商品ID=25
且數量>=2
的紀錄,將結帳金額\*0.95
,並更新回transByCust
:
更新回transByCust等於修改KV中的V值對吧,該用昨天講的哪個方法呢?
可以考慮使用mapValues(func):(對每個KV中的V執行某個func,但K值不變):
scala > transByCust = transByCust.mapValues(tran => { ①
if(tran(3).toInt == 25 && tran(4).toInt > 1) ②
tran(5) = (tran(5).toDouble * 0.95).toString ③
tran ④
})
①傳入一個函式,交易紀錄在函式以tran表示
②若商品ID(tran(3).toInt == 25
)與數量(tran(4).toDouble > 1
)吻合,進入③
③修改結帳金額(打95折)
④回傳交易紀錄
注意傳入的函式,處理方式是By每個KV中的V,所以撰寫函式時只需要考量如何處理好一個V即可。
因為之前將transByCust宣告成mutable的var,所以可以對裡面的元素進行修改操作。
字典ID:81
牙刷:70
思路:這次不是用mapValues
修改值了,有一種方式是用購買五本字典的客戶以上的客戶,然後用類似任務1的作法手動新增贈品的交易紀錄,但是任務1因為是找極值
所以筆數不會多,但如果購買字典5本字典以上的人很多怎麼辦XDD,因此我們要想個自動新增交易紀錄(贈品)的方式:
scala> transByCust = transByCust.flatMapValues(tran => {
if(tran(3).toInt == 81 && tran(4).toInt >= 5) { ①
val cloned = tran.clone() ②
cloned(5) = "0.00"; cloned(3) = "70"; cloned(4) = "1"; ③
List(tran, cloned) ④
}
else
List(tran) ⑤
})
可以用flatMapValues
,傳入一個會產生集合的函式,它可以讓一個元素變成多個元素,並且幫你攤平,等於由一筆紀錄變成多筆紀錄。
①判斷此筆紀錄是否符合條件
②若符合條件則clone一筆紀錄,不clone自己建一個String Array也OK
③更新陣列值
④回傳一個元素的集合物件,之後會攤平,所以要用一個List將兩個Array wrapper起來
⑤不符合條件的話只能wrapper原始的tran陣列
這題用了一個比較複雜的解法,但可以從範例中體會flatMapValues的用法
睡衣ID:63
因為每個人可能用多筆紀錄,該如何將每個人的紀錄各自加總呢?並且目前是String陣列,應該要轉換一下格式
先將紀錄轉換成簡單的(customerID,消費紀錄):
scala> val amouts = transByCust.mapValues(t=>t(5).toDouble)
amouts: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[6] at mapValues at <console>:30
沒錯,就是用mapValues
觀察一下:
scala> amouts.collect
res3: Array[(Int, Double)] = Array((51,9506.21), (99,4107.59), (79,2987.22), (51,7501.89), (86,8370.2), (63,1023.57), (23,5892.41), (49,9298.18), (97,9462.89), (94,4199.15), (59,5984.68), (8,1859.2)...
進入正題,該如何將每個人的消費金額加總,類似DB中的GroupBy搭配SUM呢?
scala> val total = amouts.reduceByKey(_+_)
total: org.apache.spark.rdd.RDD[(Int, Double)] = ShuffledRDD[7] at reduceByKey at <console>:32
就是reduceByKey
啦!
觀察一下:
scala> total.collect
total: Array[(Int, Double)] = Array((34,77332.59), (52,58348.020000000004), (96,36928.57), (4,41801.35), (16,40696.020000000004), (82,58722.58), (66,52130.009999999995), (28,45534.299999999996), (54,36307.04) ...
恩,要排序一下,還記得sortBy
嗎:
scala> val total = amouts.reduceByKey(_+_).collect.sortBy(_._2)
total: Array[(Int, Double)] = Array((60,17333.71), (48,17949.850000000002), (30,19194.91), (67,23201.989999999998), (6,30549.28), (44,31061.99), (29,31389.32), (19,33422.229999999996)....
OK,看來第1名消費王就是我們的ID60大戶XD~(堪誤,請看留言
)
新增一筆紀錄到前一天的建立的complTrans集合物件中
scala> complTrans = complTrans :+ Array("2015-03-30", "11:59 PM", "76",
"63", "1", "0.00")
:+
的用法在官網API文件描述為 def :+(elem: A): Array[A]
也就是在:+
左邊放上Array,右邊放上元素,則結果會將元素append在Array最後:
簡易範例:
scala> val a = List(1)
a: List[Int] = List(1)
scala> val b = a :+ 2
b: List[Int] = List(1, 2)
最後將complTrans寫入transByCust中,該如何做?
scala> transByCust = transByCust.union(sc.parallelize(complTrans).map(t =>
(t(2).toInt, t)))
sc.parallelize(complTrans)
將complTrans轉成RDD.map(t =>(t(2).toInt, t))
將complTrans轉成符合transByCust的格式:PairRDDunion
將合併兩個RDD感謝你的精采文章!
小小勘誤: total
在 sortBy(_._2)
之後應該是遞增排序,所以消費最高的 ID 應該是最後一筆 76 才對~
是的,不好意思!!,
我現在覺得list.sortWith(_._2 > _._2).head._1
做成遞減然後取第一筆的Key(也就是ID)似乎更好
K到第九天就明顯感受到用spark.read.textFile("xxx")與sc.textFile("xxx")的差異了!mapValues方法的實作: tran => {if(...) ...修改陣列值 回傳tran} 用sc讀入的話因為都是RDD,所以正常運作!不過用spark讀入的話, rdd.mapValues實作的話:
error: type mismatch:
found : org.apache.spark.rdd.RDD[(Int, Array[String])]
required: org.apache.spark.sql.Dataset[(Int, Array[String])]
所以RDD有辦法轉Dataset嗎???
PO 一下我用Java改寫完跑出來的結果: https://goo.gl/CKwGwb
文件有列出哪些人符合95折, 獲得牙刷, 及 flatMapValue與union後transByCust總數的變化!